package com.learn.elasticsearch.config;


import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.Assert;

/**
 * RestClientConfig作为Elasticsearch Rest客户端的配置了类：
 *
 * @author: MI
 * @email: 448341911@qq.com
 * @createTime: 2024/1/13 0:10
 * @updateUser: MI
 * @updateTime: 2024/1/13 0:10
 * @updateRemark: 修改内容
 * @version: v1.0
 */
@Slf4j
@Configuration
@EnableConfigurationProperties(ElasticSearchProperties.class)
public class ElasticClientConfig {

    @Value("${elasticsearch.hosts}")
    public String elasticsearchHost;

    @Value("${elasticsearch.username}")
    public String elasticsearchUsername;

    @Value("${elasticsearch.password}")
    public String elasticsearchPassword;

    @Value("${elasticsearch.connection.timeout}")
    public int elasticsearchConnectionTimeout;

    @Value("${elasticsearch.socket.timeout}")
    public int elasticsearchSocketTimeout;

    @Value("${elasticsearch.connection.request.timeout}")
    public int getElasticsearchConnectionRequestTimeout;

    @Bean
    public ElasticsearchClient elasticsearchClient() {
        RestClient restClient = RestClient.builder(getESHttpHosts()).setRequestConfigCallback(requestConfigBuilder -> {
            //设置连接超时时间
            requestConfigBuilder.setConnectTimeout(elasticsearchConnectionTimeout);
            requestConfigBuilder.setSocketTimeout(elasticsearchSocketTimeout);
            requestConfigBuilder.setConnectionRequestTimeout(getElasticsearchConnectionRequestTimeout);
            return requestConfigBuilder;
        }).setFailureListener(new RestClient.FailureListener() {
            //某节点失败,这里可以做一些告警
            @Override
            public void onFailure(Node node) {
                log.error(String.valueOf(node));
            }
        }).setHttpClientConfigCallback(httpClientBuilder -> {
            httpClientBuilder.disableAuthCaching();
            //设置账密
            return getHttpAsyncClientBuilder(httpClientBuilder);
        }).build();
        ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);
    }

    private HttpAsyncClientBuilder getHttpAsyncClientBuilder(HttpAsyncClientBuilder httpClientBuilder) {
        if (StringUtils.isEmpty(elasticsearchUsername) || StringUtils.isEmpty(elasticsearchPassword)) {
            return httpClientBuilder;
        }
        //账密设置
        CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        //es账号密码（一般使用,用户elastic）
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(elasticsearchUsername, elasticsearchPassword));
        httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
        return httpClientBuilder;
    }

    /**
     * ElasticSearch 连接地址
     * 多个逗号分隔
     * 示例：127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203
     */
    private HttpHost[] getESHttpHosts() {
        String[] hosts = elasticsearchHost.split(",");
        HttpHost[] httpHosts = new HttpHost[hosts.length];
        for (int i = 0; i < httpHosts.length; i++) {
            String host = hosts[i];
            host = host.replaceAll("http://", "").replaceAll("https://", "");
            Assert.isTrue(host.contains(":"), String.format("your host %s format error , Please refer to [ 127.0.0.1:9200 ] ", host));
            httpHosts[i] = new HttpHost(host.split(":")[0], Integer.parseInt(host.split(":")[1]), "http");
        }
        return httpHosts;
    }
}
